服务器端主程序
package net.java2000.project.chat;
import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
*
* @author 老紫竹 www.java2000.net
*
*/
public class ChatServer {
boolean started = false;
ServerSocket ss = null;
private Set<ChatServerThread> threadSet = new HashSet<ChatServerThread>();
private BlockingDeque<String> deque = new LinkedBlockingDeque<String>(5);
public void removeThread(ChatServerThread thread) {
threadSet.remove(thread);
}
public void addMessage(String msg) {
try {
System.out.println("<< " + msg);
deque.putFirst(msg);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
new ChatServer().start();
}
public void start() {
try {
ss = new ServerSocket(7000);
started = true;
} catch (BindException e) {
System.out.println("无法启动服务器:端口被占用");
} catch (IOException e) {
System.out.println("无法启动服务器:" + e.getLocalizedMessage());
}
try {
Thread thread = null;
new ChatServerBroadcast(threadSet, deque).start();
while (started) {
Socket socket = ss.accept();
System.out.println("a client connected");
ChatServerThread c = new ChatServerThread(this, socket);
thread = new Thread(c);
c.setThreadId(thread.getId());
threadSet.add(c);
thread.start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
ss.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}服务器端线程处理类
package net.java2000.project.chat;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
public class ChatServerThread implements Runnable {
private Socket s;
private DataInputStream dis = null;
private DataOutputStream dos = null;
boolean bconnected = false;
private long threadId;
public long getThreadId() {
return threadId;
}
public void setThreadId(long threadId) {
this.threadId = threadId;
}
private ChatServer server = null;
ChatServerThread(ChatServer server, Socket s) {
this.server = server;
this.s = s;
try {
dis = new DataInputStream(s.getInputStream());
dos = new DataOutputStream(s.getOutputStream());
bconnected = true;
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
while (bconnected) {
server.addMessage(dis.readUTF());
}
} catch (EOFException e) {
System.out.println("Client closed!");
} catch (IOException e) {
e.printStackTrace();
} finally {
server.removeThread(this);
try {
if (dis != null)
dis.close();
if (s != null)
s.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
public void sendMessage(String msg) {
try {
dos.writeUTF(msg);
} catch (Exception ex) {
bconnected = false;
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (threadId ^ (threadId >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final ChatServerThread other = (ChatServerThread) obj;
if (threadId != other.threadId)
return false;
return true;
}
}服务器端信息广播类
package net.java2000.project.chat;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
/**
* 负责群发消息到各个连接
*
* @author Administrator
*
*/
public class ChatServerBroadcast extends Thread {
private Set<ChatServerThread> threadSet;
private BlockingDeque<String> deque;
ChatServerBroadcast(Set<ChatServerThread> threadSet, BlockingDeque<String> deque) {
this.threadSet = threadSet;
this.deque = deque;
}
public void run() {
while (true) {
try {
String msg = deque.takeLast();
System.out.println(">> " + msg);
Iterator<ChatServerThread> it = threadSet.iterator();
while (it.hasNext()) {
it.next().sendMessage(msg);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}